package com.Hsu.mq.mqclient;

/**
 * @author Hsu琛君珩
 * @ClassName:Channel
 * @date 2024-03-03
 * @apiNote
 * @Version: v1.0
 */

import com.Hsu.mq.common.*;
import com.Hsu.mq.mqserver.core.BasicProperties;
import com.Hsu.mq.mqserver.core.ExchangeType;
import lombok.Data;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 表示一个逻辑上的连接，还需要提供一系列的方法去和服务器提供的核心 API 对应
 * 客户端提供的这些方法，方法的内部就是发了一个特定的请求
 * 一个客户端可以有多个模块，每个模块都可以和 brokerServer 之间建立 “逻辑上的连接” (channel)
 * 但这几个模块的 channel 是互不影响的，但这几个 channel 复用了同一个 TCP 连接
 */
@Data
public class Channel {
    private String channelId;
    //当前这个 channel 属于哪个连接
    private Connection connection;
    //用来存储后续客户端收到的服务器的响应
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    //如果当前 channel 订阅了某个队列，就需要在此处记录下对应回调是什么，当该队列的消息返回回来时，调用回调
    //此处约定一个 Channel 中只能有一个回调
    private Consumer consumer = null;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }

    /**
     * 在这个方法中，和服务器交互，告知服务器，此处客户端创建了新的 channel
     * @return
     * @throws IOException
     */
    public boolean createChannel() throws IOException {
        //对于创建 channel 来说，此时 payload 就是一个 basicArguments 对象
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);

        //构造出完整请求后，就可以发送这个请求了
        connection.writeRequest(request);
        //等待服务器的响应
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 期望使用这个方法来阻塞等待服务器的响应
     * 客户端一个 Connection 下可能有多个 channel，此时这些 channel 可能都在给服务器发送请求
     * 而服务器返回响应的顺序不一定和请求的顺序一样的，对应的 channel 发出请求后只需要接受对应的响应，别的是不反应的
     * 弄 basicReturnsMap 这个哈希表就是为了这个目的，客户端统一搞一个线程，用来读取 socket 中收到的所有响应数据，读取之后就把数据放到上述哈希表中（key 是 rid）
     * 客户端其他代码发了请求之后，就可以不停去哈希表中哈找是否存在自己匹配的相遇，是才可以取走，不是就要继续等待
     * @param rid
     * @return
     */
    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while ((basicReturns = basicReturnsMap.get(rid)) == null) {
            //如果查询结果为 null，就要继续等待
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        //读取成功之后，还需要把这个消息从哈希表中删除，避免一直积累导致内存爆炸
        basicReturnsMap.remove(rid);
        return basicReturns;
    }

    /**
     * 唤醒所有线程
     * @param basicReturns
     */
    public void putReturns(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            //当前也不知道有多少个线程在阻塞等待上述的响应
            //把所有等待的线程都唤醒
            notifyAll();
        }
    }

    private String generateRid(){
        return "R-"+UUID.randomUUID().toString();
    }

    /**
     * 关闭 channel，给服务器发送一个 type=0x2 的请求
     * @return
     * @throws IOException
     */
    public boolean close() throws IOException {
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setRid(generateRid());
        basicArguments.setChannelId(channelId);
        byte[] payload = BinaryTool.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 创建交换机
     * @param exchangeName
     * @param exchangeType
     * @param durable
     * @param autoDelete
     * @param arguments
     * @return
     * @throws IOException
     */
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException {
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        exchangeDeclareArguments.setAutoDelete(autoDelete);
        exchangeDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 删除交换机
     * @param exchangeName
     * @return
     * @throws IOException
     */
    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 创建队列
     * @param queueName
     * @param durable
     * @param exclusive
     * @param autoDelete
     * @param arguments
     * @return
     * @throws IOException
     */
    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                                Map<String, Object> arguments) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setRid(generateRid());
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setDurable(durable);
        queueDeclareArguments.setExclusive(exclusive);
        queueDeclareArguments.setAutoDelete(autoDelete);
        queueDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(queueDeclareArguments);

        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 删除队列
     * @param queueName
     * @return
     * @throws IOException
     */
    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments arguments = new QueueDeleteArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 创建绑定
     * @param queueName
     * @param exchangeName
     * @param bindingKey
     * @return
     * @throws IOException
     */
    public boolean queueBind(String queueName,String exchangeName,String bindingKey) throws IOException {
        QueueBindArguments arguments = new QueueBindArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setExchangeName(exchangeName);
        arguments.setBindingKey(bindingKey);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 删除绑定
     * @param queueName
     * @param exchangeName
     * @return
     * @throws IOException
     */
    public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
        QueueUnbindArguments arguments = new QueueUnbindArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 发送消息
     * @param exchangeName
     * @param routingKey
     * @param basicProperties
     * @param body
     * @return
     * @throws IOException
     */
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
        BasicPublishArguments arguments = new BasicPublishArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        arguments.setRoutingKey(routingKey);
        arguments.setBasicProperties(basicProperties);
        arguments.setBody(body);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 订阅消息
     * @param queueName
     * @param autoAck
     * @param consumer
     * @return
     */
    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
        // 先设置回调.
        if (this.consumer != null) {
            throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");
        }
        this.consumer = consumer;

        BasicConsumeArguments arguments = new BasicConsumeArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setConsumerTag(channelId);  // 此处 consumerTag 也使用 channelId 来表示了.
        arguments.setQueueName(queueName);
        arguments.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 确认消息
     * @param queueName
     * @param messageId
     * @return
     */
    public boolean basicAck(String queueName, String messageId) throws IOException {
        BasicAckArguments arguments = new BasicAckArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setMessageId(messageId);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }
}